-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1305: Support persisting RDD's directly to Tachyon #158
Conversation
The PR is moved here, and the left commits will be push to this site then. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
One or more automated tests failed |
… PR#468 of apache-incubator-spark to the apache-spark
Merged build triggered. |
Merged build started. |
Merged build finished. |
One or more automated tests failed |
@@ -97,6 +101,23 @@ private[spark] class BlockManager( | |||
var asyncReregisterTask: Future[Unit] = null | |||
val asyncReregisterLock = new Object | |||
|
|||
private def tachyonStore : TachyonStore = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you make this a lazy val, it will have the same effect, be more efficient, and be less code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, here it's fine to use lazy val.
This seems to be failing tests, have you run them locally? |
Hey @RongGu we've made some changes to the block store that I think will require up-merging this patch to master. We're interested in merging this soon. Any chance you could take a look at this and bring it up to date? Thanks! |
Hey @mateiz . Yes, I verified the whole thing last night. I set up a tachyon locally and the example I added using tachyonstore works well. Also, the BlockManagerSuite with my added test passed here. |
Hey, @pwendell . Thanks for your notification. I've been busy with the school stuff here since this semester started. There is still a bit work need to done in this PR. I guess several other PRs will be merged into trunk in next a few days. Thus, I would like to perform the up-merging work when I have done with the feature in this PR and before you think it's okay to merge this PR. In a word, I would like to perform the up-merging work at last. Is this plan work for you? |
Build triggered. |
Build started. |
Build finished. |
One or more automated tests failed |
Build triggered. |
Build started. |
Build finished. |
One or more automated tests failed |
Jenkins, test this please |
Build triggered. |
Build started. |
boolean useTachyon, | ||
boolean deserialized, | ||
int replication) { | ||
return StorageLevel.apply(useDisk, useMemory, useTachyon, deserialized, replication); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't just delete the old method -- add a second version that takes the new useTachyon argument. Some user code will be using the old method and there's no reason to break it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13781/ |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@mateiz Merged the master branch. |
HY, it still doesn't merge. I see Maybe you didn't update to the very latest one? Or maybe you got unlucky. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@mateiz See if it works now :) |
Merging cleanly now. I merged this - thanks! |
No worries, thanks a lot to both you and Haoyuan for putting this together! This will be a great feature to have. |
Move the PR#468 of apache-incubator-spark to the apache-spark "Adding an option to persist Spark RDD blocks into Tachyon." Author: Haoyuan Li <[email protected]> Author: RongGu <[email protected]> Closes apache#158 from RongGu/master and squashes the following commits: 72b7768 [Haoyuan Li] merge master 9f7fa1b [Haoyuan Li] fix code style ae7834b [Haoyuan Li] minor cleanup a8b3ec6 [Haoyuan Li] merge master branch e0f4891 [Haoyuan Li] better check offheap. 55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel 7cd4600 [RongGu] remove some logic code for tachyonstore's replication 51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore 8adfcfa [RongGu] address arron's comment on inTachyonSize 120e48a [RongGu] changed the root-level dir name in Tachyon 5cc041c [Haoyuan Li] address aaron's comments 9b97935 [Haoyuan Li] address aaron's comments d9a6438 [Haoyuan Li] fix for pspark 77d2703 [Haoyuan Li] change python api.git status 3dcace4 [Haoyuan Li] address matei's comments 91fa09d [Haoyuan Li] address patrick's comments 589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE 64348b2 [Haoyuan Li] update conf docs. ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1 619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler 49cc724 [Haoyuan Li] update docs with off_headp option 4572f9f [RongGu] reserving the old apply function API of StorageLevel 04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP 76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix 939e467 [Haoyuan Li] 0.4.1-thrift from maven central 86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1 16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem 6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 d827250 [RongGu] fix JsonProtocolSuie test failure 716e93b [Haoyuan Li] revert the version ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift 2825a13 [RongGu] up-merging to the current master branch of the apache spark 6a22c1a [Haoyuan Li] fix scalastyle 8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client. 77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice. 1dcadf9 [Haoyuan Li] typo bf278fa [Haoyuan Li] fix python tests e82909c [Haoyuan Li] minor cleanup 776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR 8859371 [Haoyuan Li] various minor fixes and clean up e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode. fcaeab2 [Haoyuan Li] address Aaron's comment e554b1e [Haoyuan Li] add python code 47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels. dc8ef24 [Haoyuan Li] add old storelevel constructor e01a271 [Haoyuan Li] update tachyon 0.4.1 8011a96 [RongGu] fix a brought-in mistake in StorageLevel 70ca182 [RongGu] a bit change in comment 556978b [RongGu] fix the scalastyle errors 791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
## What changes were proposed in this pull request? The ACL Thrift end to end test currently fail because they do not set the `spark.databricks.acl.enabled` flag. This PR fixes that. ## How was this patch tested? This is a test. Author: Herman van Hovell <[email protected]> Closes apache#158 from hvanhovell/SC-5365-thrift.
### What changes were proposed in this pull request? This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once. E.g. the following query: ``` SELECT (SELECT avg(a) FROM t), (SELECT sum(b) FROM t) ``` is optimized from: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L] : :- Aggregate [avg(a#244) AS avg(a)#247] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Aggregate [sum(a#251) AS sum(a)#250L] : +- Project [a#251] : +- Relation default.t[a#251,b#252] parquet +- OneRowRelation ``` to: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L] : :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : +- Project [a#244] : +- Relation default.t[a#244,b#245] parquet +- OneRowRelation ``` and in the physical plan subqueries are reused: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L] : :- Subquery subquery#242, [id=#113] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158] +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> +- == Initial Plan == Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110] +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> : +- ReusedSubquery Subquery subquery#242, [id=#113] +- *(1) Scan OneRowRelation[] +- == Initial Plan == ... ``` Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well. ### Why are the changes needed? Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - MergeScalarSubqueries off 50798 52521 1423 0.0 Infinity 1.0X [info] q9 - MergeScalarSubqueries on 19484 19675 226 0.0 Infinity 2.6X [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9b - MergeScalarSubqueries off 15430 17803 NaN 0.0 Infinity 1.0X [info] q9b - MergeScalarSubqueries on 3862 4002 196 0.0 Infinity 4.0X ``` Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE. The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1. ### Does this PR introduce _any_ user-facing change? No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config. ### How was this patch tested? Existing and new UTs. Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: attilapiros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once. E.g. the following query: ``` SELECT (SELECT avg(a) FROM t), (SELECT sum(b) FROM t) ``` is optimized from: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L] : :- Aggregate [avg(a#244) AS avg(a)#247] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Aggregate [sum(a#251) AS sum(a)#250L] : +- Project [a#251] : +- Relation default.t[a#251,b#252] parquet +- OneRowRelation ``` to: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L] : :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : +- Project [a#244] : +- Relation default.t[a#244,b#245] parquet +- OneRowRelation ``` and in the physical plan subqueries are reused: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L] : :- Subquery subquery#242, [id=#113] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158] +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> +- == Initial Plan == Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110] +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> : +- ReusedSubquery Subquery subquery#242, [id=#113] +- *(1) Scan OneRowRelation[] +- == Initial Plan == ... ``` Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well. ### Why are the changes needed? Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - MergeScalarSubqueries off 50798 52521 1423 0.0 Infinity 1.0X [info] q9 - MergeScalarSubqueries on 19484 19675 226 0.0 Infinity 2.6X [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9b - MergeScalarSubqueries off 15430 17803 NaN 0.0 Infinity 1.0X [info] q9b - MergeScalarSubqueries on 3862 4002 196 0.0 Infinity 4.0X ``` Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE. The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1. ### Does this PR introduce _any_ user-facing change? No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config. ### How was this patch tested? Existing and new UTs. Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: attilapiros <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit e00b81e) Signed-off-by: Wenchen Fan <[email protected]>
Move the PR#468 of apache-incubator-spark to the apache-spark
"Adding an option to persist Spark RDD blocks into Tachyon."